package com.process;


import canal.bean.RowData;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.base.MySqlBaseETL;
import com.bean.*;
import com.utils.ConfigReader;
import com.utils.RedisUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

/**
 * TODO 此需求需要
 *
 * @Description: 同步维度数据
 * @Author: Sky
 * @Times : 2021/8/15 21:27
 */
public class SyncDimData {

    /**
     * 实现步骤：
     * 1：获取数据源
     * 2：过滤出来维度表
     * 3：处理同步过来的数据，更新到redis中
     */
    public static void process(StreamExecutionEnvironment env) throws Exception {

        MySqlBaseETL mysql = new MySqlBaseETL();
        DataStream<canal.bean.RowData> canalDataStream = mysql.KafkaConsummer(ConfigReader.input_topic_canal, env);
        //过滤出维度表。
        DataStream<RowData> dimRowDataStream = canalDataStream.filter(new FilterFunction<RowData>() {
            @Override
            public boolean filter(RowData rowData) throws Exception {

                //维度表现过滤出来

                if (rowData.getTableName().equals("foo_goods")) {
                    return true;
                } else if (rowData.getTableName().equals("foo_shops")) {
                    return true;
                } else if (rowData.getTableName().equals("foo_goods_cats")) {
                    return true;
                } else if (rowData.getTableName().equals("foo_org")) {
                    return true;
                } else if (rowData.getTableName().equals("foo_shop_cats")) {
                    return true;
                }
                return false;
            }
        });


        //3：处理同步过来的数据，更新到redis中
        dimRowDataStream.addSink(new RichSinkFunction<RowData>() {
            private Jedis jedis;

            @Override
            public void open(Configuration parameters) throws Exception {
                jedis = RedisUtil.getJedis().getResource();
                jedis.select(1);
            }

            @Override
            public void close() throws Exception {
                if (jedis.isConnected()) {
                    jedis.close();
                }
            }

            @Override
            public void invoke(RowData rowData, Context context) {
                String type = rowData.getEventType();
                if (type.equals("insert") || type.equals("update")) {
                    updateDimData(rowData);
                } else if (type.equals("delete")) {
                    deleteDimData(rowData);
                }
            }
        });
    }


    //更新维度
    public static void updateDimData(RowData rowData) {
        Jedis jedis = RedisUtil.getJedis().getResource();
        jedis.select(1);
        String tableName = rowData.getTableName();
        if (tableName.equals("foo_goods")) {
            String goodsId = rowData.getColumns().get("goodsId");
            String goodsName = rowData.getColumns().get("goodsName");
            String shopId = rowData.getColumns().get("shopId");
            String goodsCatId = rowData.getColumns().get("goodsCatId");
            String shopPrice = rowData.getColumns().get("shopPrice");
            DimGoodsDBEntity dimGoods = new DimGoodsDBEntity(Long.valueOf(goodsId), goodsName, Long.valueOf(shopId), Integer.parseInt(goodsCatId), Double.valueOf(shopPrice));
            String json = JSON.toJSONString(dimGoods);
            jedis.hset("foo_shop:dim_goods", goodsId, json);
            System.out.println("foo_shop:dim_goods-----更新成功");
        } else if (tableName.equals("foo_shops")) {
            String shopId = rowData.getColumns().get("shopId");
            String areaId = rowData.getColumns().get("areaId");
            String shopName = rowData.getColumns().get("shopName");
            String shopCompany = rowData.getColumns().get("shopCompany");
            DimShopsDBEntity dimShop = new DimShopsDBEntity(Integer.parseInt(shopId), Integer.parseInt(areaId), shopName, shopCompany);
            jedis.hset("foo_shop:dim_shops", shopId + "", JSON.toJSONString(dimShop, SerializerFeature.DisableCircularReferenceDetect));
            System.out.println("foo_shop:dim_shops-----更新成功");
        } else if (tableName.equals("foo_goods_cats")) {
            String catId = rowData.getColumns().get("catId");
            String parentId = rowData.getColumns().get("parentId");
            String catName = rowData.getColumns().get("catName");
            String cat_level = rowData.getColumns().get("cat_level");
            DimGoodsCatDBEntity entity = new DimGoodsCatDBEntity(catId, parentId, catName, cat_level);
            jedis.hset("foo_shop:dim_goods_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect));
            System.out.println("foo_shop:dim_goods_cats-----更新成功");
        } else if (tableName.equals("foo_org")) {
            String orgId = rowData.getColumns().get("orgId");
            String parentId = rowData.getColumns().get("parentId");
            String orgName = rowData.getColumns().get("orgName");
            String orgLevel = rowData.getColumns().get("orgLevel");
            DimOrgDBEntity entity = new DimOrgDBEntity(Integer.valueOf(orgId), Integer.valueOf(parentId), orgName, Integer.valueOf(orgLevel));
            jedis.hset("foo_shop:dim_org", orgId + "", JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect));
            System.out.println("foo_shop:dim_org-----更新成功");
        } else if (tableName.equals("foo_shop_cats")) {
            //如果是门店商品分类维度表更新
            String catId = rowData.getColumns().get("catId");
            String parentId = rowData.getColumns().get("parentId");
            String catName = rowData.getColumns().get("catName");
            String cat_level = rowData.getColumns().get("catSort");
            DimShopCatDBEntity entity = new DimShopCatDBEntity(catId, parentId, catName, cat_level);
            jedis.hset("foo_shop:dim_shop_cats", catId, JSON.toJSONString(entity, SerializerFeature.DisableCircularReferenceDetect));
            System.out.println("foo_shop:dim_shop_cats-----更新成功");

        }

    }

    /**
     * 删除维度数据
     *
     * @param rowData
     */
    public static void deleteDimData(RowData rowData) {
        Jedis jedis = RedisUtil.getJedis().getResource();
        jedis.select(1);
        String tableName = rowData.getTableName();
        if (tableName.equals("foo_goods")) {
            jedis.hdel("foo_shop:dim_goods", rowData.getColumns().get("goodsId"));
            System.out.println("foo_shop:dim_goods-----删除成功");

        } else if (tableName.equals("foo_shops")) {
            jedis.hdel("foo_shop:dim_shops", rowData.getColumns().get("shopId"));
            System.out.println("foo_shop:dim_shops-----删除成功");

        } else if (tableName.equals("foo_goods_cats")) {
            jedis.hdel("foo_shop:dim_goods_cats", rowData.getColumns().get("catId"));
            System.out.println("foo_shop:dim_goods_cats-----删除成功");

        } else if (tableName.equals("foo_org")) {
            jedis.hdel("foo_shop:dim_org", rowData.getColumns().get("orgId"));
            System.out.println("foo_shop:dim_org-----删除成功");

        } else if (tableName.equals("foo_shop_cats")) {
            jedis.hdel("foo_shop:dim_shop_cats", rowData.getColumns().get("catId"));
            System.out.println("foo_shop:dim_shop_cats-----删除成功");

        }

    }


}
